package com.chenjl.trace.stream.topology;

import java.util.Map;

import com.chenjl.trace.stream.util.Constant;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import lombok.extern.slf4j.Slf4j;
/**
 * Spout获取数据，持续向Bolt发送句子
 * 2018-8-16 20:02:12
 * @author chenjinlong
 */
@Slf4j
public class SentencesRichSpout implements IRichSpout {
	private static final long serialVersionUID = 1L;

	private SpoutOutputCollector spoutOutputCollector;
	
	private int index = 0;
	private String[] sentences = {
			"Apache Storm is a free and open source distributed realtime computation system",
			"Storm makes it easy to reliably process unbounded streams of data",
			"doing for realtime processing what Hadoop did for batch processing",
			"Storm is simple",
			"can be used with any programming language",
			"and is a lot of fun to use" };
	
	
	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		log.info("SentencesRichSpout declareOutputFields~~");
		
		//定义下传的字段名称
		outputFieldsDeclarer.declare(new Fields(Constant.SENTENCES_FIELD));
	}
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map stormConf,TopologyContext topologyContext,SpoutOutputCollector spoutOutputCollector) {
		log.info("IRichSpout open~~");
		this.spoutOutputCollector = spoutOutputCollector;
	}
	@Override
	public void nextTuple() {
		if(index >= sentences.length) {
			//log.info("IRichSpout nextTuple, no data , return");
			return;
		}
		
		String sentence = sentences[index];
		index++;
		log.info("IRichSpout nextTuple~~, send sentence : {} ",sentence);
		spoutOutputCollector.emit(new Values(sentence));
		
		try {
			Thread.sleep(1*1000);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	@Override
	public void ack(Object msgId) {
		log.info("IRichSpout ack~~ : {}",msgId);
	}
	@Override
	public void fail(Object msgId) {
		log.info("IRichSpout fail~~ : {}",msgId);
	}
	@Override
	public void close() {
		log.info("SentencesRichSpout close，发送句子次数：{}",index);
	}
	@Override
	public void activate() {
		
	}
	@Override
	public void deactivate() {
		
	}
	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}